package org.ghost.springboot2.demo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.ghost.springboot2.demo.common.constant.SpringProfileConstant;
import org.ghost.springboot2.demo.config.RocketMqConfig;
import org.ghost.springboot2.demo.config.RocketMqProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;

/**
 * @author Administrator
 */
@Slf4j
@Component
@Profile(SpringProfileConstant.PROFILE_CONSUMER_PUSH_ORDERLY)
public class DefaultPushOrderlyConsumer {
    @Autowired
    private RocketMqProperties rocketMqProperties;

    @PostConstruct
    public void init() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rocketMqProperties.getGroupName());
        consumer.setNamesrvAddr(rocketMqProperties.getNameServerAddr());
        //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //订阅主题和 标签（ * 代表所有标签)下信息
        consumer.subscribe(RocketMqConfig.TOPIC_ORDER, "*");
        //注册消费的监听 并在此监听中消费信息，并返回消费的状态信息
        consumer.registerMessageListener((MessageListenerOrderly) (msgList, context) -> {
            //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的
            context.setAutoCommit(true);
            // msgList中只收集同一个topic，同一个tag，并且key相同的message
            // 会把不同的消息分别放置到不同的队列中
            for (Message msg : msgList) {
                //消费者获取消息 这里只输出 不做后面逻辑处理
                String body = new String(msg.getBody(), StandardCharsets.UTF_8);
                log.info("*****DefaultPushOrderlyConsumer-获取消息-topic={}，tags={}，keys={}，body={}", msg.getTopic(), msg.getTags(), msg.getKeys(), body);
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });

        consumer.start();
        System.out.println("=======DefaultPushOrderlyConsumer-消费者-启动成功=======");
    }
}
